#!/usr/bin/python3 -cimport os, sys; os.execv(os.path.dirname(sys.argv[1]) + "/../common/pywrap", sys.argv)

# This file is part of Cockpit.
#
# Copyright (C) 2013 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <https://www.gnu.org/licenses/>.

import base64
import json
import os
import subprocess
import time

import testlib


class TestConnection(testlib.MachineCase):
    def setUp(self):
        super().setUp()
        self.ws_executable = f"{self.libexecdir}/cockpit-ws"

    def ostree_setup_ws(self):
        """Overlay cockpit-ws package on OSTree image

        Disable the cockpit/ws container. This is for tests that don't work with the container,
        and to make sure that overlaying cockpit-ws works as well.
        """
        m = self.machine
        if not m.ostree_image or "bootc" in m.image:
            return

        # uninstall cockpit/ws container startup script
        m.execute("rm /etc/systemd/system/cockpit.service")
        # overlay cockpit-ws rpm
        m.execute("rpm-ostree install --cache-only /var/tmp/cockpit-ws-*.rpm", timeout=180)
        m.reboot()

    def assertNoAdminProcessLeaks(self):
        """Check that machine did not leak any bridges or ssh-agent admin processes"""
        m = self.machine
        try:
            # there may still be user-wide ones like dbus-broker
            m.execute("while pgrep -au admin '(cockpit|ssh-agent)'; do sleep 0.1; done", timeout=30)
        except RuntimeError:
            # show the leaked processes in the assertion
            self.fail(m.execute("pgrep -au admin '(cockpit|ssh-agent)'"))

    def testBasic(self):
        m = self.machine

        # always test with the default ws install (container on OSTree, package everywhere else)
        self.check_basic_with_start_stop(m.start_cockpit, m.stop_cockpit)

        # on OSTree, also check with overlaid cockpit-ws rpm
        if m.ostree_image:
            def ws_start():
                m.execute(r"""
                    mkdir -p /etc/systemd/system/cockpit.service.d/
                    printf "[Service]\nExecStart=\n%s --no-tls" `grep ExecStart= /lib/systemd/system/cockpit.service` \
                            > /etc/systemd/system/cockpit.service.d/notls.conf
                    systemctl daemon-reload
                    systemctl start cockpit.socket""")

            def ws_stop():
                m.execute("systemctl stop cockpit cockpit.socket")

            self.ostree_setup_ws()
            # HACK: Getting SELinux errors with just rpm-ostree install; there's a plethora of failures, so just allow them all
            m.execute("setenforce 0")
            self.allow_journal_messages('audit.*avc:  denied .*')
            self.check_basic_with_start_stop(ws_start, ws_stop)

    def check_basic_with_start_stop(self, start_cockpit, stop_cockpit):
        m = self.machine
        b = self.browser
        start_cockpit()

        # take cockpit-ws down on the login page
        b.open("/system")
        b.wait_visible("#login")
        b.set_val("#login-user-input", "admin")
        b.set_val("#login-password-input", "foobar")
        stop_cockpit()
        b.click('#login-button')
        b.wait_text_not('#login-fatal-message', "")
        start_cockpit()
        b.reload()
        b.wait_visible("#login")
        b.set_val("#login-user-input", "admin")
        b.set_val("#login-password-input", "foobar")
        b.click('#login-button')
        b.enter_page("/system")

        # cookie should not be marked as secure, it's not https
        cookie = b.cookie("cockpit")
        self.assertTrue(cookie["httpOnly"])
        self.assertEqual(cookie["sameSite"], "strict")
        self.assertFalse(cookie["secure"])

        # take cockpit-ws down on the server page
        stop_cockpit()
        b.switch_to_top()
        b.wait_in_text(".curtains-ct h1", "Disconnected")

        start_cockpit()
        b.click("#machine-reconnect")
        b.wait_visible("#login")
        b.set_val("#login-user-input", "admin")
        b.set_val("#login-password-input", "foobar")

        # sever the connection on the login page
        stop_cockpit()
        b.click('#login-button')
        b.wait_text_not('#login-fatal-message', "")
        start_cockpit()
        b.reload()
        b.wait_visible("#login")
        b.set_val("#login-user-input", "admin")
        b.set_val("#login-password-input", "foobar")
        b.click('#login-button')
        b.enter_page("/system")

        # sever the connection on the server page
        # would be nice to use `firewall-cmd --add-rich-rule`, but firewalld always allows "established" connections
        m.execute("nft add table ip cockpittest")
        m.execute("nft add chain ip cockpittest INPUT '{ type filter hook input priority 0; policy accept; }'")
        m.execute("nft insert rule ip cockpittest INPUT tcp dport 9090 reject")

        try:
            b.switch_to_top()
            with b.wait_timeout(90):
                b.wait_visible(".curtains-ct")

            b.wait_in_text(".curtains-ct h1", "Disconnected")
            b.wait_in_text('.curtains-ct .pf-v5-c-empty-state__body', "Connection has timed out.")
        finally:
            m.execute("nft delete table ip cockpittest")

        b.click("#machine-reconnect")
        b.enter_page("/system")
        b.logout()

        # deleted cookie after logout should not be marked as secure, it's not https
        cookie = b.cookie("cockpit")
        self.assertEqual(cookie["value"], "deleted")
        self.assertTrue(cookie["httpOnly"])
        self.assertFalse(cookie["secure"])

        self.assertNoAdminProcessLeaks()

        if not m.ws_container and not m.ostree_image:  # no cockpit-session or r/o file system
            # damage cockpit-session socket → "Authentication not available"
            m.execute(f"chmod g-x {self.libexecdir}/cockpit-session")
            try:
                m.execute("mv /run/cockpit/session /run/cockpit/session.disabled")
                b.open("/system")
                b.try_login()
                # .. but the login page does not expose the error very specifically
                b.wait_in_text('#login-error-title', "Authentication failed")
            finally:
                m.execute("mv /run/cockpit/session.disabled /run/cockpit/session")

            self.allow_journal_messages("/run/cockpit/session: couldn't connect.*")

            # pretend cockpit-bridge is not installed, expect specific error message
            m.execute("while B=$(command -v cockpit-bridge); do mv $B ${B}.disabled; done")
            b.open("/system")
            b.try_login()
            b.wait_in_text('#login-error-message', "Install the cockpit-system package")
            m.execute("while B=$(command -v cockpit-bridge.disabled); do mv $B ${B%.disabled}; done")

        # Lets crash a systemd-controlled process and see if we get a proper backtrace in the logs
        # This helps with debugging failures in the tests elsewhere
        m.write("/run/systemd/system/systemd-hostnamed.service.d/core.conf",
                "[Service]\nLimitCORE=infinity\n")
        m.execute("""systemctl daemon-reload
                     systemctl restart systemd-hostnamed
                     pkill -e -SEGV systemd-hostnam""")
        testlib.wait(lambda: m.execute("journalctl -b | grep 'Process.*systemd-hostnam.*of user.*dumped core.'"))

        # Make sure the core dumps exist in the directory, so we can download them
        cores = m.execute("find /var/lib/systemd/coredump -type f")
        self.assertNotEqual(cores, "")

        self.allow_core_dumps = True
        self.allow_journal_messages(".*org.freedesktop.hostname1.*DBus.Error.NoReply.*")

    @testlib.skipWsContainer("cockpit/ws doesn't use systemd units")
    @testlib.nondestructive
    def testUnitLifecycle(self):
        m = self.machine

        def expect_active(unit, is_active):
            status = m.execute(f"systemctl is-active {unit} || true").strip()
            self.assertIn(status, ["active", "inactive"])
            if is_active:
                self.assertEqual(status, "active", f"{unit} is not active")
            else:
                self.assertEqual(status, "inactive", f"{unit} is active")

        def expect_actives(ws_socket, instance_sockets, http_instances, https_instances=0):
            expect_active("cockpit.socket", ws_socket)
            # http instances
            for instance in ["http"]:
                expect_active(f"cockpit-wsinstance-{instance}.socket", instance_sockets)
                expect_active(f"cockpit-wsinstance-{instance}.service", instance in http_instances)
            # number of https instances
            expect_active("cockpit-wsinstance-https-factory.socket", instance_sockets)
            for _type in ["service", "socket"]:
                out = m.execute(f"systemctl --no-legend -t {_type} list-units cockpit-wsinstance-https@*")
                count = len(out.strip().splitlines())
                self.assertEqual(count, https_instances, out)
            # instance sockets get cleaned up when stopping cockpit.socket or the instances
            if not ws_socket or not instance_sockets:
                self.assertEqual(
                    m.execute("test ! -e /run/cockpit/wsinstance/ || ls /run/cockpit/wsinstance/").strip(),
                    "")

        # at the beginning, no cockpit related units are running
        m.execute("systemctl reset-failed")
        m.stop_cockpit()
        expect_actives(ws_socket=False, instance_sockets=False, http_instances=[])

        # http only mode

        m.start_cockpit(tls=False)
        expect_actives(ws_socket=True, instance_sockets=False, http_instances=[])
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --head http://127.0.0.1:9090"))

        expect_actives(ws_socket=True, instance_sockets=True, http_instances=["http"])
        self.assertRaises(subprocess.CalledProcessError, m.execute,
                          "curl --silent https://127.0.0.1:9090")
        # c-tls knows it can't do https, and not activate that instance
        expect_actives(ws_socket=True, instance_sockets=True, http_instances=["http"])

        m.restart_cockpit()
        expect_actives(ws_socket=True, instance_sockets=True, http_instances=["http"])

        m.stop_cockpit()
        expect_actives(ws_socket=False, instance_sockets=False, http_instances=[])

        # cleans up also when cockpit-tls crashes or idle-exits, not just by explicit stop request
        m.start_cockpit(tls=False)
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --head http://127.0.0.1:9090"))
        m.execute("pkill -e cockpit-tls")
        expect_actives(ws_socket=True, instance_sockets=False, http_instances=[])

        # and recovers from that
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --head http://127.0.0.1:9090"))
        expect_actives(ws_socket=True, instance_sockets=True, http_instances=["http"])

        # cleans up instances when killing the DynamicUser helper unit
        m.start_cockpit(tls=False)
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --head http://127.0.0.1:9090"))
        expect_actives(ws_socket=True, instance_sockets=True, http_instances=["http"])
        m.execute("systemctl stop cockpit-wsinstance-socket-user")
        expect_actives(ws_socket=True, instance_sockets=False, http_instances=[])

        # https mode

        m.start_cockpit(tls=True)
        expect_actives(ws_socket=True, instance_sockets=False, http_instances=[], https_instances=0)

        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --head http://127.0.0.1:9090"))

        expect_actives(ws_socket=True, instance_sockets=True, http_instances=["http"], https_instances=0)
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent -k --head https://127.0.0.1:9090"))
        expect_actives(ws_socket=True, instance_sockets=True, http_instances=["http"], https_instances=1)

        m.restart_cockpit()
        expect_actives(ws_socket=True, instance_sockets=True, http_instances=["http"], https_instances=1)

        m.stop_cockpit()
        expect_actives(ws_socket=False, instance_sockets=False, http_instances=[], https_instances=0)

        m.start_cockpit(tls=True)
        expect_actives(ws_socket=True, instance_sockets=False, http_instances=[], https_instances=0)
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --head http://127.0.0.1:9090"))

        expect_actives(ws_socket=True, instance_sockets=True, http_instances=["http"], https_instances=0)
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent -k --head https://127.0.0.1:9090"))
        expect_actives(ws_socket=True, instance_sockets=True, http_instances=["http"], https_instances=1)

        # cleans up also when cockpit-tls crashes or idle-exits, not just by explicit stop request
        m.execute("pkill -e cockpit-tls")
        expect_actives(ws_socket=True, instance_sockets=False, http_instances=[], https_instances=0)
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --head http://127.0.0.1:9090"))
        expect_actives(ws_socket=True, instance_sockets=True, http_instances=["http"], https_instances=0)
        # next https request after crash doesn't leak an instance
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent -k --head https://127.0.0.1:9090"))
        expect_actives(ws_socket=True, instance_sockets=True, http_instances=["http"], https_instances=1)

        # instance service+socket going away does not confuse cockpit-tls' bookkeeping
        m.execute("systemctl stop cockpit-wsinstance-https@*.service cockpit-wsinstance-https@*.socket")
        expect_actives(ws_socket=True, instance_sockets=True, http_instances=["http"], https_instances=0)
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --show-error -k --head https://127.0.0.1:9090"))
        expect_actives(ws_socket=True, instance_sockets=True, http_instances=["http"], https_instances=1)

        # sockets are inaccessible to users, only to cockpit-tls
        for s in ["http.sock", "https-factory.sock"]:
            out = m.execute(f"su -c '! nc -U /run/cockpit/wsinstance/{s} 2>&1 || exit 1' admin")
            self.assertIn("Permission denied", out)

    @testlib.skipWsContainer("cockpit/ws doesn't use systemd units")
    @testlib.nondestructive
    def testHttpsInstanceDoS(self):
        m = self.machine
        # prevent generating core dump artifacts
        orig = m.execute("cat /proc/sys/kernel/core_pattern").strip()
        m.execute("echo core > /proc/sys/kernel/core_pattern")
        self.addCleanup(m.execute, f"echo '{orig}' > /proc/sys/kernel/core_pattern")
        self.addCleanup(m.execute, "systemctl reset-failed")
        m.start_cockpit(tls=True)

        # some netcat versions need an explicit shutdown option, others default to shutting down and don't have -N
        n_opt = "-N" if "-N" in m.execute("nc -h 2>&1") else ""

        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent -k --head https://127.0.0.1:9090"))

        # number of https instances is bounded (DoS prevention)
        # with MaxTasks=200 und 2 threads per ws instance we should have a
        # rough limit of 100 instances, so at some point curl should start failing
        m.execute("runuser -u cockpit-wsinstance-socket -- sh -ec 'RC=1; for i in `seq 120`; do "
                  "  echo -n $i | nc %s -U /run/cockpit/wsinstance/https-factory.sock;"
                  "  curl --silent --head --max-time 5 --unix-socket /run/cockpit/wsinstance/https@$i.sock http://dummy > /dev/null || RC=0; "
                  "done; exit $RC'" % n_opt)

        for type_ in ["socket", "service"]:
            active = int(m.execute("systemctl --no-legend list-units -t %s --state=active "
                                   "'cockpit-wsinstance-https@*' | wc -l" % type_).strip())
            self.assertGreater(active, 45)
            self.assertLess(active, 110)
        failed = int(m.execute("systemctl --no-legend list-units --state=failed 'cockpit-wsinstance-https@*' | wc -l").strip())
        self.assertGreater(failed, 0)
        self.assertLess(failed, 75)  # services and sockets

        self.allow_journal_messages(".*cockpit-ws.*dumped core.*")
        self.allow_journal_messages(".*Error creating thread: Resource temporarily unavailable.*")
        self.allow_journal_messages(".*Failed to check for ssh: Failed to fork.*")

        # initial instance still works
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --show-error -k --head https://127.0.0.1:9090"))

        # can launch new instances after freeing up some old ones
        m.execute("systemctl stop cockpit-wsinstance-https@30 cockpit-wsinstance-https@31 cockpit-wsinstance-https@32")
        m.execute(f"echo -n new | nc {n_opt} -U /run/cockpit/wsinstance/https-factory.sock")
        out = m.execute("curl --silent --show-error --head --unix-socket /run/cockpit/wsinstance/https@new.sock http://dummy")
        self.assertIn("HTTP/1.1 200 OK", out)

    @testlib.nondestructive
    @testlib.skipBeiboot("beiboot doesn't have certs on the host")
    def testTls(self):
        m = self.machine
        b = self.browser

        # Start Cockpit with TLS, force cert regeneration
        m.execute("rm -f /etc/cockpit/ws-certs.d/*")
        m.start_cockpit(tls=True)

        # A normal TLS connection works
        output = m.execute('openssl s_client -connect 172.27.0.15:9090 2>&1')
        m.message(output)
        self.assertIn("DONE", output)

        # has proper keyUsage and SAN (both with sscg and with self-signed)
        output = m.execute("openssl s_client -showcerts -connect 172.27.0.15:9090 |"
                           "openssl x509 -noout -ext keyUsage,extendedKeyUsage,subjectAltName")
        # keyUsage
        self.assertIn("Digital Signature", output)
        self.assertIn("Key Encipherment", output)
        # extendedKeyUsage
        self.assertIn("TLS Web Server Authentication", output)
        # SAN
        self.assertIn("IP Address:127.0.0.1", output)
        self.assertIn("DNS:localhost", output)

        # SSLv3 should not work
        output = m.execute('openssl s_client -connect 172.27.0.15:9090 -ssl3 2>&1 || true')
        self.assertNotIn("DONE", output)

        # Some operating systems fail SSL3 on the server side
        self.assertRegex(output, "Secure Renegotiation IS NOT supported|"
                         "ssl handshake failure|"
                         "[uU]nknown option.* -ssl3|"
                         "null ssl method passed|"
                         "wrong version number")

        # RC4 should not work
        output = m.execute('! openssl s_client -connect 172.27.0.15:9090 -tls1_2 -cipher RC4 2>&1')
        self.assertNotIn("DONE", output)
        self.assertRegex(
            output, r"no cipher match|no ciphers available|ssl handshake failure|Cipher is \(NONE\)")

        # fails with useful error message with broken certs/keys
        if not m.ostree_image:  # not using systemd unit
            def check_fail(expected: list[str]):
                m.start_cockpit(tls=True)
                m.execute('! openssl s_client -connect 172.27.0.15:9090 2>&1')
                out = m.execute("! systemctl status cockpit.service")
                self.assertIn("failed", out)
                for s in expected:
                    self.assertIn(s, out)

            # missing key
            orig_key = m.execute("cat /etc/cockpit/ws-certs.d/0-self-signed.key")
            m.execute("mv /etc/cockpit/ws-certs.d/0-self-signed.key /etc/cockpit/ws-certs.d/key.bak")
            check_fail(["0-self-signed.key", "No such file or directory"])
            # broken key
            m.write("/etc/cockpit/ws-certs.d/0-self-signed.key", "-----BEGIN PRIVATE KEY-----\nnope")
            check_fail(["/etc/cockpit/ws-certs.d/0-self-signed.cert/.key", "decoding error"])
            m.execute("mv /etc/cockpit/ws-certs.d/key.bak /etc/cockpit/ws-certs.d/0-self-signed.key")
            # missing cert; does not stomp over the existing key
            m.execute("rm /etc/cockpit/ws-certs.d/0-self-signed.cert")
            check_fail(["Found 0-self-signed.key but no 0-self-signed.cert", "Please remove the key file first"])
            self.assertEqual(m.execute("cat /etc/cockpit/ws-certs.d/0-self-signed.key"), orig_key)
            # broken cert
            m.write("/etc/cockpit/ws-certs.d/0-self-signed.cert", "-----BEGIN CERTIFICATE-----\nnope")
            check_fail(["/etc/cockpit/ws-certs.d/0-self-signed.cert/.key", "header error"])

        # gets along with missing directory
        m.execute("rm -r /etc/cockpit/ws-certs.d")
        m.start_cockpit(tls=True)
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl -k --head https://127.0.0.1:9090"))

        # get along with read-only config directory, as long as certificate exists
        if not m.ws_container:
            m.stop_cockpit()
            try:
                m.execute("mount -o bind -r /etc/cockpit /etc/cockpit")
                m.start_cockpit(tls=True)
                self.assertIn("HTTP/1.1 200 OK", m.execute("curl -k --head https://127.0.0.1:9090"))
            finally:
                m.execute("umount /etc/cockpit")

        # Install a certificate chain
        m.upload(["verify/files/cert-chain.cert", "verify/files/cert-chain.key"], "/etc/cockpit/ws-certs.d")

        def check_cert_chain():
            # This should also reset the file context
            m.restart_cockpit()
            output = m.execute('openssl s_client -connect 172.27.0.15:9090 2>&1')
            self.assertIn("DONE", output)
            self.assertRegex(output, "s:/?CN *= *localhost")
            self.assertRegex(output, "1 s:/?OU *= *Intermediate")

        check_cert_chain()

        # *.crt file also works
        m.execute("mv /etc/cockpit/ws-certs.d/cert-chain.cert /etc/cockpit/ws-certs.d/cert-chain.crt")
        check_cert_chain()

        # backwards compat: merged cert+key file also still works with cockpit-tls (but not any more with cockpit-ws/container)
        if not m.ws_container:
            m.execute("""cat /etc/cockpit/ws-certs.d/cert-chain.key >> /etc/cockpit/ws-certs.d/cert-chain.crt
                         chmod 640 /etc/cockpit/ws-certs.d/cert-chain.crt
                         rm /etc/cockpit/ws-certs.d/cert-chain.key""")
            check_cert_chain()

        # certmonger generated certificate; asciibetically later than the above
        # not all images have certmonger
        if m.image not in ["debian-stable", "debian-testing", "fedora-coreos", "arch", "centos-9-bootc"]:
            hostname = m.execute("hostname --fqdn").strip()
            m.execute(f"getcert request -f /etc/cockpit/ws-certs.d/monger.cert -k /etc/cockpit/ws-certs.d/monger.key -D {hostname} --ca=local --wait")
            self.addCleanup(m.execute, "getcert stop-tracking -f /etc/cockpit/ws-certs.d/monger.cert")
            # cert generation succeeded, and it is being tracked
            self.assertIn("MONITORING", m.execute("getcert list"))
            self.assertIn("/etc/cockpit/ws-certs.d/monger.cert",
                          m.execute(f"{self.libexecdir}/cockpit-certificate-ensure --check"))
            m.restart_cockpit()
            output = m.execute('openssl s_client -connect 172.27.0.15:9090 2>&1')
            self.assertIn("DONE", output)
            self.assertRegex(output, f"CN.*=.*{hostname}")
            self.assertRegex(output, "i:/?CN.*=.*Local Signing Authority.*")

        # login handler: correct password
        m.execute("curl -k -c cockpit.jar -s --head --header 'Authorization: Basic {}' https://127.0.0.1:9090/cockpit/login".format(
            base64.b64encode(b"admin:foobar").decode(), ))
        headers = m.execute("curl -k --head -b cockpit.jar -s https://127.0.0.1:9090/")

        # https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Security-Policy/connect-src
        #
        #    Note: connect-src 'self' does not resolve to websocket
        #    schemes in all browsers, more info in this issue.
        #
        # https://github.com/w3c/webappsec-csp/issues/7
        #
        # Make sure we send the explicit `wss://` item until we're absolutely
        # sure about the browser support situation for `connect-src 'self';`.
        self.assertIn("wss://127.0.0.1:9090", headers)

        # We want to make sure we're *not* sending any CORS headers.
        CORS_HEADERS = [
            # https://en.wikipedia.org/wiki/Cross-origin_resource_sharing#Response_headers
            'Access-Control-Allow-Credentials',
            'Access-Control-Expose-Headers',
            'Access-Control-Max-Age',
            'Access-Control-Allow-Methods',
            'Access-Control-Allow-Headers',
        ]
        for cors_header in CORS_HEADERS:
            self.assertNotIn(cors_header, headers)

        # CORP and Frame-Options are also set for dynamic paths
        self.assertIn("Cross-Origin-Resource-Policy: same-origin", headers)
        self.assertIn("X-Frame-Options: sameorigin", headers)

        self.allow_journal_messages(
            ".*Peer failed to perform TLS handshake",
            ".*Peer sent fatal TLS alert:.*",
            ".*invalid base64 data in Basic header",
            "Received unexpected TLS connection and no certificate was configured",
            ".*Error performing TLS handshake: No supported cipher suites have been found.",
            ".*Error performing TLS handshake: Could not negotiate a supported cipher suite.")

        # check the Debian smoke test
        m.upload(["../tools/debian/tests/smoke"], "/tmp")
        m.execute("/tmp/smoke")

        self.login_and_go("/system", tls=True)
        cookie = b.cookie("cockpit")
        # cookie should be marked as secure
        self.assertTrue(cookie["httpOnly"])
        self.assertTrue(cookie["secure"])
        self.assertEqual(cookie["sameSite"], "strict")
        # same after logout
        b.logout()
        cookie = b.cookie("cockpit")
        self.assertEqual(cookie["value"], "deleted")
        self.assertTrue(cookie["httpOnly"])
        self.assertTrue(cookie["secure"])
        self.assertEqual(cookie["sameSite"], "strict")

        # http on localhost should not redirect to https
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --head http://127.0.0.1:9090"))
        # http on other IP should redirect to https
        output = m.execute("curl --head http://172.27.0.15:9090")
        self.assertIn("HTTP/1.1 301 Moved Permanently", output)
        self.assertIn("Location: https://172.27.0.15:9090/", output)
        # enable AllowUnencrypted, this disables redirect
        m.write("/etc/cockpit/cockpit.conf", "[WebService]\nAllowUnencrypted=true")
        m.restart_cockpit()
        # now it should not redirect
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --head http://127.0.0.1:9090"))
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --head http://172.27.0.15:9090"))

    @testlib.nondestructive
    def testConfigOrigins(self):
        m = self.machine
        m.write("/etc/cockpit/cockpit.conf", "[WebService]\nOrigins = http://other-origin:9090 http://localhost:9090")
        m.start_cockpit()
        headers = {
            'Connection': 'Upgrade',
            'Upgrade': 'websocket',
            'Host': 'localhost:9090',
            'Origin': 'http://other-origin:9090',
            'Sec-Websocket-Key': '3sc2c9IzwRUc3BlSIYwtSA==',
            'Sec-Websocket-Version': 13
        }
        output = m.curl('-f', '-N', 'http://localhost:9090/cockpit/socket', headers=headers)
        self.assertIn('"no-session"', output)

        # The socket should also answer at /socket
        output = m.curl('-f', '-N', 'http://localhost:9090/socket', headers=headers)
        self.assertIn('"no-session"', output)

        self.allow_journal_messages('peer did not close io when expected')

    @testlib.skipWsContainer("cockpit/ws doesn't have certs on the host")
    @testlib.nondestructive
    def test100YearsCert(self):
        m = self.machine

        selfsign = '/etc/cockpit/ws-certs.d/0-self-signed.cert'
        helper = f'{self.libexecdir}/cockpit-certificate-helper'
        ensure = f'{self.libexecdir}/cockpit-certificate-ensure'

        # Ensure things are as we expect them to be
        m.execute('rm -f /etc/cockpit/ws-certs.d/*')
        m.execute(f'grep DAYS=395 {helper}')

        # Generate a 100 years expiry certificate
        self.sed_file('s/DAYS=395/DAYS=36500/', helper)
        m.execute(f'grep DAYS=36500 {helper}')  # double-check
        m.execute(ensure)

        # Verify the expiry date to be in the far-future.  This is a bit
        # annoying due to the date format OpenSSL uses (which Python can't
        # trivially parse) and the question of locales
        expires = m.execute(f'date -d "$(openssl x509 -enddate -noout < {selfsign} | cut -f2 -d=)" +%s')
        self.assertGreater(int(expires), time.time() + 99 * 365 * 24 * 60 * 60)

        # Put things back: avoid problematic multiple invocations of .sed_file()
        m.execute(f'sed -i s/DAYS=36500/DAYS=395/ {helper}')
        m.execute(f'grep DAYS=395 {helper}')  # double-check

        # Run ensure again and make sure we get a new certificate
        m.execute('touch /tmp/timestamp')
        m.execute(ensure)
        m.execute(f'test {selfsign} -nt /tmp/timestamp')
        m.execute('rm /tmp/timestamp')

        # Check that the expiry is less than 420 days
        # See https://github.com/sgallagher/sscg/pull/28 for why 420
        expires = m.execute(f'date -d "$(openssl x509 -enddate -noout < {selfsign} | cut -f2 -d=)" +%s')
        self.assertLess(int(expires), time.time() + 420 * 24 * 60 * 60)

        # Run ensure again and make sure we *don't* get a new certificate
        m.execute('touch /tmp/timestamp')
        m.execute(ensure)
        m.execute(f'test ! {selfsign} -nt /tmp/timestamp')
        m.execute('rm /tmp/timestamp')

    @testlib.skipWsContainer("cockpit/ws doesn't use systemd units")
    @testlib.nondestructive
    def testSocket(self):
        m = self.machine

        # non-admin user
        m.execute("useradd bob")

        # enable no-password login for 'admin' and 'bob'
        m.execute("passwd -d admin")
        m.execute("passwd -d bob")

        self.sed_file('$ a\\\nPermitEmptyPasswords yes', '/etc/ssh/sshd_config',
                      self.restart_sshd)

        def assertInOrNot(string: str, result: str, *, expected: bool) -> None:
            if expected:
                self.assertIn(string, result)
            else:
                self.assertNotIn(string, result)

        def checkMotdForUser(string: str, user: str, *, expected: bool) -> None:
            result = m.execute(f"ssh -o StrictHostKeyChecking=no -n {user}@localhost")
            assertInOrNot(string, result, expected=expected)

        def checkMotdContent(string: str, *, expected: bool = True) -> None:
            # Needs https://github.com/linux-pam/linux-pam/pull/292 (or PAM 1.5.0)
            old_pam = m.image.startswith('rhel-8-') or m.image in ['ubuntu-2204']

            # check issue (should be exactly the same as motd)
            assertInOrNot(string, m.execute("cat /etc/issue.d/cockpit.issue"), expected=expected)

            # check motd as 'root' (via cat) and 'admin' and 'bob' (via ssh)
            assertInOrNot(string, m.execute("cat /etc/motd.d/cockpit"), expected=expected)
            checkMotdForUser(string, expected=expected, user='admin')
            checkMotdForUser(string, expected=old_pam and expected, user='bob')

        m.stop_cockpit()
        checkMotdContent('systemctl')
        checkMotdContent(':9090/', expected=False)
        m.start_cockpit()

        checkMotdContent(':9090/')
        checkMotdContent('systemctl', expected=False)
        m.execute("systemctl stop cockpit.socket")

        # Change port according to documentation: https://cockpit-project.org/guide/latest/listen.html
        m.execute('! selinuxenabled || semanage port -m -t websm_port_t -p tcp 443')
        self.write_file("/etc/systemd/system/cockpit.socket.d/listen.conf",
                        "[Socket]\nListenStream=\nListenStream=/run/cockpit/sock\nListenStream=443",
                        post_restore_action="systemctl stop cockpit.socket; systemctl daemon-reload")

        checkMotdContent('systemctl')
        checkMotdContent(':9090/', expected=False)
        checkMotdContent(':443/', expected=False)
        m.start_cockpit(tls=True)

        checkMotdContent('systemctl', expected=False)
        checkMotdContent(':9090/', expected=False)
        checkMotdContent(':443/')

        output = m.execute('curl -k https://localhost 2>&1 || true')
        self.assertIn('Loading...', output)
        output = m.execute('curl -k --unix-socket /run/cockpit/sock https://dummy 2>&1 || true')
        self.assertIn('Loading...', output)

        output = m.execute('curl -k https://localhost:9090 2>/dev/null || echo $?')
        self.assertIn('7', output.strip())

        self.allow_journal_messages(".*Peer failed to perform TLS handshake")

    @testlib.skipWsContainer("no cockpit-ws package with cockpit/ws container")
    def testWsPackage(self):
        m = self.machine

        # On RHEL-8 SSH allows password root login by default, so Cockpit does too.
        ROOT_LOGIN_ENABLED = ['rhel-8']
        root_login_disallowed = not any(m.image.startswith(img) for img in ROOT_LOGIN_ENABLED)

        if m.image.startswith("debian") or m.image.startswith("ubuntu"):
            # clean up debug symbols, they get in the way of upgrading
            m.execute("dpkg --purge cockpit-ws-dbgsym")
        elif m.image.startswith("rhel"):
            # subscription-manager-cockpit depends on cockpit-ws, and cockpit.rpm metapackage depends on sub-man on RHEL
            m.execute("rpm --erase cockpit subscription-manager-cockpit")

        def install():
            if m.image.startswith("debian") or m.image.startswith("ubuntu"):
                m.execute("dpkg --install /var/tmp/build/cockpit-ws_*.deb")
            elif m.image == "arch":
                m.execute("pacman -U --noconfirm /var/tmp/build/cockpit-*.pkg.tar.zst")
            else:
                m.execute("if rpm -q cockpit-ws; then rpm --verify cockpit-ws; fi")
                m.execute("rpm --upgrade --force /var/tmp/build/cockpit-ws-*.rpm")
                m.execute("rpm --verify cockpit-ws")

        def remove():
            if m.image.startswith("debian") or m.image.startswith("ubuntu"):
                m.execute("dpkg --purge cockpit cockpit-ws")
            elif m.image == "arch":
                m.execute("pacman -Rdd --noconfirm cockpit")
            elif m.image.startswith("rhel"):
                m.execute("rpm --erase cockpit-ws")
            else:
                m.execute("rpm --erase cockpit cockpit-ws")

        # clean install sets up dynamic motd/issue symlink and pam disallowed users.
        self.assertIn('Activate the web console', m.execute("cat /etc/motd.d/cockpit"))
        self.assertIn('Activate the web console', m.execute("cat /etc/issue.d/cockpit.issue"))
        if root_login_disallowed:
            self.assertIn('root', m.execute("cat /etc/cockpit/disallowed-users"))
        # remove disallowed-users to simulate an upgrade where we did not have this file yet.
        m.execute("rm /etc/cockpit/disallowed-users")

        # package upgrade keeps them
        install()
        self.assertIn('Activate the web console', m.execute("cat /etc/motd.d/cockpit"))
        self.assertIn('Activate the web console', m.execute("cat /etc/issue.d/cockpit.issue"))
        # disallowed-users should not exists now as this is an upgrade, except on Arch
        if m.image != "arch":
            m.execute("test ! -e /etc/cockpit/disallowed-users")
            m.execute("echo 'root' > /etc/cockpit/disallowed-users")
        else:
            m.execute("test -e /etc/cockpit/disallowed-users")

        # HACK: On Arch Linux the symlink is overwritten, bug?
        if m.image != "arch":
            # manual change/removal is respected on upgrade
            m.execute("ln -sf /dev/null /etc/motd.d/cockpit; rm /etc/issue.d/cockpit.issue")
            install()
            self.assertEqual(m.execute("readlink /etc/motd.d/cockpit").strip(), "/dev/null")
            m.execute("test ! -e /etc/issue.d/cockpit.issue")

        # removing the package cleans up the links
        remove()
        m.execute("test ! -e /etc/motd.d/cockpit")
        m.execute("test ! -e /etc/issue.d/cockpit.issue")
        m.execute("test ! -e /etc/cockpit/disallowed-users")

        # fresh install (most of our test images have cockpit-ws preinstalled, so the first test above does not cover that)
        install()
        # verify that we installed relative links in the new package
        self.assertEqual(m.execute("readlink /etc/motd.d/cockpit").strip(), "../../run/cockpit/issue")
        self.assertEqual(m.execute("readlink /etc/issue.d/cockpit.issue").strip(), "../../run/cockpit/issue")
        if root_login_disallowed:
            self.assertIn('root', m.execute("cat /etc/cockpit/disallowed-users"))

    @testlib.skipWsContainer("no cockpit-ws binary with cockpit/ws container")
    @testlib.nondestructive
    def testCommandline(self):
        m = self.machine

        # Large requests are processed correctly with plain HTTP through cockpit-tls
        m.start_cockpit(tls=True)
        large_headers = {'Authorization': f'Negotiate {1:07000}'}
        self.assertIn('id="login"', m.curl('http://localhost:9090/', headers=large_headers))

        # Large requests are processed correctly with TLS through cockpit-tls
        self.assertIn('id="login"', m.curl('-k', 'https://localhost:9090/', headers=large_headers))
        m.stop_cockpit()

        m.execute("rm -f /etc/cockpit/ws-certs.d/* /etc/cockpit/cockpit.conf")
        m.write("/etc/cockpit/cockpit.conf", "[WebService]\nLoginTitle = A Custom Title\n")

        m.execute(f"{self.libexecdir}/cockpit-certificate-ensure")
        self.assertTrue(m.execute("ls /etc/cockpit/ws-certs.d/*"))

        pid = m.spawn(f"{self.ws_executable} --port 9000 --address 127.0.0.1", "cockpit-ws.log")
        self.addCleanup(m.execute, f"kill {pid}")

        # The port may not be available immediately, so wait for it
        testlib.wait(lambda: 'A Custom Title' in m.curl('-k', 'https://localhost:9000/'))

        output = m.execute('curl -s -S -k https://172.27.0.15:9000/ 2>&1 || echo $?')
        self.assertIn('7', output.strip())

        # Large requests are processed correctly with plain HTTP
        self.assertIn('A Custom Title', m.curl('http://localhost:9000/', headers=large_headers))

        # Large requests are processed correctly with TLS
        self.assertIn('A Custom Title', m.curl('-k', 'https://localhost:9000/', headers=large_headers))

        # unsupported HTTP method
        self.assertIn("HTTP/1.1 405 Method Not Allowed",
                      m.execute('curl -k --verbose -X PATCH https://localhost:9000/ 2>&1'))

        # no body with HEAD request
        out = m.execute('curl -k --head https://localhost:9000/')
        self.assertIn("HTTP/1.1 200", out)
        self.assertNotIn("<html>", out)

        # robots.txt from any directory and with/without TLS, and without auth
        expected = "User-agent: *\nDisallow: /\n"
        self.assertEqual(m.execute("curl http://localhost:9000/robots.txt"), expected)
        self.assertEqual(m.execute("curl http://localhost:9000/somedir/robots.txt"), expected)
        self.assertEqual(m.execute("curl -k https://localhost:9000/robots.txt"), expected)

    @testlib.nondestructive
    def testHeadRequest(self):
        m = self.machine
        m.start_cockpit()

        # static handler
        headers = m.execute("curl -s --head http://172.27.0.15:9090/cockpit/static/login.html")
        self.assertIn("HTTP/1.1 200 OK\r\n", headers)
        self.assertIn("Content-Type: text/html\r\n", headers)
        self.assertIn("Cross-Origin-Resource-Policy: same-origin\r\n", headers)
        self.assertIn("X-Frame-Options: sameorigin\r\n", headers)
        # login.html is not always accessible as a file (e.g. on CoreOS), so just assert a reasonable content length
        self.assertIn("Content-Length: ", headers)
        length = int(headers.split('Content-Length: ', 1)[1].split()[0])
        self.assertGreater(length, 5000)
        self.assertLess(length, 100000)

        # login handler: wrong password
        headers = m.execute("curl -s --head --header 'Authorization: Basic {}' http://172.27.0.15:9090/cockpit/login".format(
            base64.b64encode(b"admin:hahawrong").decode()))
        self.assertRegex(headers, r"HTTP/1.1 (401 Authentication failed|403 Permission denied)\r\n")
        self.assertNotIn("Set-Cookie:", headers)

        # login handler: correct password
        headers = m.execute("curl -s --head --header 'Authorization: Basic {}' http://172.27.0.15:9090/cockpit/login".format(
            base64.b64encode(b"admin:foobar").decode()))
        self.assertIn("HTTP/1.1 200 OK\r\n", headers)
        self.assertIn("Set-Cookie: cockpit", headers)

        # socket handler; this should refuse HEAD (as it makes little sense on sockets), so 404
        headers = m.execute("curl -s --head http://172.27.0.15:9090/cockpit/socket")
        self.assertIn("HTTP/1.1 404 Not Found\r\n", headers)

        # external channel handler; unauthenticated, thus 404
        headers = m.execute("curl -s --head http://172.27.0.15:9090/cockpit+123/channel/foo")
        self.assertIn("HTTP/1.1 404 Not Found\r\n", headers)

    @testlib.skipWsContainer("ssh root login not allowed")
    @testlib.nondestructive
    def testFlowControlDownload(self):
        m = self.machine
        b = self.browser

        self.login_and_go("/playground/speed", user="root", enable_root_login=True)
        b.wait_text_not("#pid", "")
        pid = b.text("#pid")

        b.set_val("#read-path", "/dev/vda")
        b.click("#read-sideband")

        b.wait_text_not("#speed", "")
        time.sleep(20)
        output = m.execute(f"cat /proc/{pid}/statm")
        rss = int(output.split(" ")[0])

        # This fails when flow control is not present
        self.assertLess(rss, 250000)

    @testlib.nondestructive
    def testFlowControlUpload(self):
        m = self.machine
        b = self.browser

        self.login_and_go("/playground/speed")
        self.addCleanup(m.execute, "rm -f /tmp/spawninput")
        b.click("#spawn-input")

        b.wait_text_not("#speed", "")
        with b.wait_timeout(60):
            b.wait_text("#spawn-input-result", "success")

        self.assertEqual(int(m.execute("stat -c %s /tmp/spawninput")), 65536000)
        self.assertEqual(m.execute("tail -c10 /tmp/spawninput").strip(), "a" * 10)

    @testlib.skipWsContainer("no cockpit-ws binary with cockpit/ws container")
    @testlib.nondestructive
    def testLocalSession(self):
        m = self.machine

        def check_session():
            m.wait_for_cockpit_running('localhost', 9090)
            b = self.browser
            # session works directly, no login page
            b.open("/")
            b.wait_visible('#content')
            b.enter_page("/system")
            b.switch_to_top()

            # disconnect browser, and wait past ws session timeout
            b.kill()
            time.sleep(17)  # cockpit_ws_service_idle plus a bit
            self.browser = self.new_browser()
            b = self.browser

            # can start new session
            b.open("/")
            b.wait_visible('#content')

            # shut it down, wait until it is gone
            m.execute("pkill cockpit-ws; while pgrep -a cockpit-ws; do sleep 1; done")
            b.wait_in_text(".curtains-ct", "Disconnected")

        # start ws with --local-session, let it spawn bridge; ensure that this works without /etc/cockpit/
        m.stop_cockpit()
        m.spawn("su - -c 'G_MESSAGES_DEBUG=all XDG_CONFIG_DIRS=/usr/local %s --no-tls "
                "--local-session=cockpit-bridge' admin" % self.ws_executable,
                "cockpit-ws-local")
        check_session()

        # start ws with --local-session and existing running bridge
        self.write_file("/tmp/local.sh", f"""#!/bin/bash -eu
coproc env G_MESSAGES_DEBUG=all cockpit-bridge
export G_MESSAGES_DEBUG=all
export XDG_CONFIG_DIRS=/usr/local
{self.ws_executable} --no-tls --local-session=- <&${{COPROC[0]}} >&${{COPROC[1]}}
""", perm="755")
        m.spawn("su - -c /tmp/local.sh admin", "local.sh")
        check_session()

        self.allow_journal_messages("couldn't register polkit authentication agent.*")

    @testlib.skipWsContainer("no cockpit-ws binary with cockpit/ws container")
    @testlib.skipImage("Kernel does not allow user namespaces", "debian-*")
    @testlib.nondestructive
    def testCockpitDesktop(self):
        m = self.machine
        m.stop_cockpit()

        cases = [(['/cockpit/@localhost/system/index.html', 'system', 'system/index', 'system/'],
                  ['id="overview"']
                  ),
                 (['/cockpit/@localhost/network/firewall.html', 'network/firewall'],
                  ['div id="firewall"', 'script src="firewall.js"']
                  ),
                 (['/cockpit/@localhost/playground/react-patterns.html', 'playground/react-patterns'],
                  ['script src="react-patterns.js"']
                  ),
                 # no ssh host
                 (['/cockpit/@localhost/manifests.json'],
                  ['"system"', '"Overview"']
                  ),
                 # remote ssh host
                 (['/cockpit/@localhost/manifests.json test1@localhost'],
                  ['"system"', '"Overview"', '"HACK"']
                  )
                 ]

        # prepare fake ssh target; to verify that we really use that, fake dashboard manifest
        m.execute("""useradd test1
                  [ -f ~admin/.ssh/id_rsa ] || su -c "ssh-keygen -t rsa -N '' -f ~/.ssh/id_rsa" admin
                  mkdir -p ~test1/.ssh ~test1/.local/share/cockpit/dashboard
                  echo '{ "version": "42", "dashboard": { "index": { "label": "HACK" } } }' > ~test1/.local/share/cockpit/dashboard/manifest.json
                  cp ~admin/.ssh/id_rsa.pub ~test1/.ssh/authorized_keys
                  ssh-keyscan localhost >> ~admin/.ssh/known_hosts
                  chown admin:admin ~admin/.ssh/known_hosts
                  chown -R test1:test1 ~test1
                  su -c "ssh test1@localhost cockpit-bridge --packages" admin | grep -q test1.*dashboard  # validate setup
                  """)

        for (pages, asserts) in cases:
            for page in pages:
                m.execute(f"""su - -c 'BROWSER="curl --silent --compressed -o /tmp/out.html" {self.libexecdir}/cockpit-desktop {page}' admin""",
                          timeout=10)

                out = m.execute("cat /tmp/out.html")
                for a in asserts:
                    self.assertIn(a, out)

                self.assertNoAdminProcessLeaks()

        # cockpit-desktop can start a privileged bridge through polkit
        # we don't have an agent, so just allow the privilege without interactive authentication
        if m.image == "arch":
            self.write_file("/etc/polkit-1/rules.d/test.rules", r"""
polkit.addRule(function(action, subject) {
    if (action.id == "org.cockpit-project.cockpit.root-bridge" && subject.user == "admin")
        return polkit.Result.YES;
}); """)
        else:
            self.write_file("/etc/polkit-1/localauthority/50-local.d/test.pkla", r"""
[Testing without an agent]
Identity=unix-user:admin
Action=org.cockpit-project.cockpit.root-bridge
ResultAny=yes
ResultInactive=yes
ResultActive=yes""")

        self.write_file("/tmp/browser.sh", """#!/bin/sh -e
curl --silent --compressed -o /tmp/out.html "$@"
# wait until privileged bridge starts
until pgrep -f '^(/usr/[^ ]+/[^ /]*python[^ /]* )?/usr/bin/cockpit-bridge'; do sleep 1; done
""")
        m.execute("chmod 755 /tmp/browser.sh")
        m.execute(f"su - -c 'BROWSER=/tmp/browser.sh {self.libexecdir}/cockpit-desktop system' admin", timeout=10)
        self.assertIn('id="overview"', m.execute("cat /tmp/out.html"))

        self.assertNoAdminProcessLeaks()

        self.allow_journal_messages("couldn't register polkit authentication agent.*")
        self.allow_journal_messages("Refusing to render service to dead parents.")
        self.allow_journal_messages(".*No authentication agent found.*")
        self.allow_journal_messages(".*Peer failed to perform TLS handshake.*")
        self.allow_journal_messages(r".*cannot reauthorize identity\(s\): unix-user:.*")
        self.allow_journal_messages("admin: Executing command .*COMMAND=.*cockpit-bridge --privileged.*")

    @testlib.skipBeiboot("test does not make sense in beiboot mode")
    def testReverseProxy(self):
        m = self.machine
        b = self.browser

        self.ostree_setup_ws()

        # set up a poor man's reverse TLS proxy with socat
        m.upload(["../test/data/mock-server.crt", "../test/data/mock-server.key"], "/tmp")
        m.spawn("socat OPENSSL-LISTEN:9090,reuseaddr,fork,cert=/tmp/mock-server.crt,"
                "key=/tmp/mock-server.key,verify=0 TCP:localhost:9099",
                "socat-tls.log")

        # and another proxy for plain http
        m.spawn("socat TCP-LISTEN:9091,reuseaddr,fork TCP:localhost:9099", "socat.log")

        # ws with plain --no-tls should fail after login with mismatching Origin (expected http, got https)
        m.execute("systemctl start cockpit-session.socket")
        m.spawn(f"runuser -u cockpit-session-socket -- {self.ws_executable} --no-tls -p 9099",
                "ws-notls.log")
        m.wait_for_cockpit_running(tls=True)

        b.open(f"https://{b.address}:{b.port}/system")
        b.wait_visible("#login")
        b.set_val("#login-user-input", "admin")
        b.set_val("#login-password-input", "foobar")
        b.click('#login-button')

        b.wait_text(".early-failure .pf-v5-c-empty-state__title", "Connection failed")
        b.wait_in_text(".early-failure ", "unexpected error while connecting to the machine")

        testlib.wait(lambda: m.execute("grep 'received request from bad Origin' /var/log/ws-notls.log"))

        # sanity check: unencrypted http through SSL proxy does not work
        m.execute("! curl http://localhost:9090")

        # does not redirect to https (through plain http proxy)
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --head http://127.0.0.1:9091"))
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --head http://172.27.0.15:9091"))

        m.execute("pkill -e cockpit-ws; while pgrep -a cockpit-ws; do sleep 1; done")
        # this page failure is reeally noisy
        self.allow_journal_messages(".*No authentication agent found.*")
        self.allow_journal_messages("couldn't register polkit authentication agent.*")
        self.allow_journal_messages("received request from bad Origin.*")
        self.allow_journal_messages(".*invalid handshake.*")
        self.allow_browser_errors(".*received unsupported version in init message.*")
        self.allow_browser_errors(".*received message before init.*")
        self.allow_browser_errors("Error reading machine id")

        # ws with --for-tls-proxy accepts only https origins, thus should work
        m.spawn(f"runuser -u cockpit-session-socket -- {self.ws_executable} --for-tls-proxy -p 9099 -a 127.0.0.1",
                "ws-fortlsproxy.log")
        m.wait_for_cockpit_running(tls=True)
        b.open(f"https://{b.address}:{b.port}/system")
        b.wait_visible("#login")
        b.set_val("#login-user-input", "admin")
        b.set_val("#login-password-input", "foobar")
        b.click('#login-button')
        b.wait_visible('#content')
        b.enter_page("/system")
        # cookie should be marked as secure, as for the browser it's https
        cookie = b.cookie("cockpit")
        self.assertTrue(cookie["httpOnly"])
        self.assertTrue(cookie["secure"])
        b.logout()
        # deleted cookie after logout should be marked as secure
        cookie = b.cookie("cockpit")
        self.assertEqual(cookie["value"], "deleted")
        self.assertTrue(cookie["httpOnly"])
        self.assertTrue(cookie["secure"])

        # should have https:// URLs in Content-Security-Policy
        out = m.execute("curl --insecure --head https://localhost:9090/")
        self.assertIn("Content-Security-Policy: connect-src 'self' https://localhost:9090 wss://localhost:9090;", out)

        # sanity check: does not redirect to https (through plain http proxy) -- this isn't a supported mode, though!
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --head http://127.0.0.1:9091"))
        self.assertIn("HTTP/1.1 200 OK", m.execute("curl --silent --head http://172.27.0.15:9091"))

    @testlib.nondestructive
    @testlib.skipBeiboot("beiboot doesn't use systemd units")
    def testCaCert(self):
        m = self.machine

        # force cert regeneration
        m.execute("systemctl stop cockpit; rm -f /etc/cockpit/ws-certs.d/*")
        m.start_cockpit()
        if not m.ws_container:
            # Really start Cockpit to make sure it has generated all its certificates.
            m.execute("systemctl start cockpit")

        # Start without a CA certificate.
        self.addCleanup(m.execute, "rm -f /etc/cockpit/ws-certs.d/0-self-signed-ca.pem")
        m.execute("rm -f /etc/cockpit/ws-certs.d/0-self-signed-ca.pem")
        m.execute("! curl -sfS http://localhost:9090/ca.cer")

        # Now make one up and check that is is served.
        m.write("/etc/cockpit/ws-certs.d/0-self-signed-ca.pem", "FAKE CERT FOR TESTING\n")
        self.assertEqual(m.execute("curl -sfS http://localhost:9090/ca.cer"), "FAKE CERT FOR TESTING\n")

    @testlib.nondestructive
    def testBranding(self):
        m = self.machine
        m.start_cockpit()

        # for all of our CI images, the part before the dash is the name of the
        # subdirectory in /usr/share/cockpit/branding.
        brand = m.image.split('-')[0]
        branddir = f'/usr/share/cockpit/branding/{brand}'

        # We don't have access to the stuff on the filesystem if it's living in
        # the container.
        if not m.ws_container:
            # make sure that there are no broken links in "our" directory
            self.assertEqual(m.execute(f'find {branddir} -xtype l'), '')

            # branding.css undergoes variable substitution based on the content of
            # /usr/lib/os-release.  Perform the substitution for ourselves for
            # validation.  envsubst comes from gettext.
            os_release_vars = m.execute("cat /usr/lib/os-release").replace('\n', ' ')
            m.execute(f'{os_release_vars} envsubst < {branddir}/branding.css > /tmp/branding.ref')
            self.addCleanup(m.execute, "rm /tmp/branding.ref")

        # fetch some files and make sure they match what we expect
        def curl_and_compare(name, content_type=None, reference=None):
            url = f'http://localhost:9090/cockpit/static/{name}'
            reference = reference or f'{branddir}/{name}'

            # Check that the expected content type is served
            if content_type is not None:
                self.assertIn(f'Content-Type: {content_type}', m.execute(f'curl --head {url}'))

            # Check that we can fetch the file
            m.execute(f'curl --fail -o /tmp/{name} {url}')
            self.addCleanup(m.execute, f"rm /tmp/{name}")

            # compare that it matches what we expected
            if not m.ws_container:
                m.execute(f'cmp /tmp/{name} {reference}')

        # some brands miss the images, but the OSes we CI have them all
        curl_and_compare('branding.css', 'text/css', reference='/tmp/branding.ref')
        curl_and_compare('logo.png', 'image/png')
        curl_and_compare('favicon.ico')

        # do a pixel test to make sure everything looks like we expect
        b = self.browser
        b.open("/system")
        b.wait_visible("#login")
        b.assert_pixels("body", "login-screen")

        # our supported OSes should all have branding make sure it matches os-release
        if m.image.startswith("ubuntu"):
            # reflects src/branding/ubuntu/branding.scss
            expected = m.execute('. /usr/lib/os-release; echo "$PRETTY_NAME"').strip()
        else:
            expected = m.execute('. /usr/lib/os-release; echo "$NAME $VARIANT"').strip()
            # quirk: trailing space hard to avoid due to the empty variant wrapped in <b>
            if m.image == 'arch':
                expected += ' '

        b.wait_text("#brand", expected)

    @testlib.skipOstree("tests cockpit-ws package")
    @testlib.skipWsContainer("tests cockpit-ws package")
    @testlib.nondestructive
    def testAuthDirectSession(self):
        m = self.machine
        b = self.browser

        m.stop_cockpit()
        # make sure that this doesn't use the session socket
        m.execute('systemctl stop cockpit-session.socket; systemctl mask cockpit-session.socket')
        self.addCleanup(m.execute, 'systemctl unmask cockpit-session.socket')
        m.write('/etc/cockpit/cockpit.conf', f"""
[Negotiate]
Action=none

[Basic]
Command={self.libexecdir}/cockpit-session
""")

        # cockpit-ws has to run as root for this, as cockpit-session isn't suid root any more
        m.execute(f"systemd-run --unit=custom-ws.service {self.libexecdir}/cockpit-ws --no-tls")
        self.addCleanup(m.execute, "systemctl stop custom-ws.service")
        m.wait_for_cockpit_running()

        b.login_and_go(None)
        b.wait_text('#current-username', 'admin')
        b.logout()

        # Arch logs this to the journal
        self.allow_journal_messages("cockpit-session:.*Activate the web console with.*")

    @testlib.skipWsContainer("no local config with cockpit/ws")
    @testlib.nondestructive
    def testXdgConfig(self):
        xdg_env = f"XDG_CONFIG_DIRS={self.vm_tmpdir}/xdg:/etc/test-xdg\n"
        m = self.machine
        self.write_file("/etc/systemd/system/service.d/xdg.conf",
                        f"[Service]\nEnvironment={xdg_env}\n",
                        post_restore_action="systemctl daemon-reload")
        m.execute("systemctl daemon-reload")

        m.execute("mkdir -p /etc/test-xdg/cockpit; mv /etc/cockpit/ws-certs.d /etc/test-xdg/cockpit/")
        self.addCleanup(m.execute, "rm -rf /etc/test-xdg")
        m.write("/etc/test-xdg/cockpit/cockpit.conf",
                "[WebService]\nLoginTitle=ExDeeGee\nAllowUnencrypted=true\n")

        # make sure this is not used
        m.write("/etc/cockpit/cockpit.conf",
                "[WebService]\nLoginTitle=NoNoNo\nAllowUnencrypted=false\n")
        m.start_cockpit(tls=True)

        # https works
        out = m.execute("curl --insecure --head https://localhost:9090/")
        self.assertIn("HTTP/1.1 200 OK", out)
        self.assertIn("Set-Cookie: cockpit", out)

        # http from non-localhost works
        b = self.browser
        b.open("/system")
        b.wait_visible("#login")
        b.wait_text("#server-name", "ExDeeGee")

        # uses the expected certs
        self.assertEqual(m.execute("readlink /run/cockpit/tls/server/key.source").strip(),
                         "/etc/test-xdg/cockpit/ws-certs.d/0-self-signed.key")
        # and did not generate new ones
        self.assertNotIn("ws-certs.d", m.execute("ls /etc/cockpit"))
        # properly iterates over dirs, ignores the one without files
        self.assertNotIn("xdg", m.execute(f"ls {self.vm_tmpdir}"))

        # bridge respects it too
        self.write_file("/etc/environment", f"{xdg_env}\n", append=True)
        m.write("/etc/test-xdg/cockpit/systemd.override.json", """{
    "menu": {
        "services": { "label": "Hackices" }
    }
}""")
        self.login_and_go(None)
        b.wait_visible("#nav-system li:contains(Hackices)")
        self.assertFalse(b.is_present("#nav-system li:contains(Services)"))
        b.logout()

        # still starts with a broken config (no section headers)
        m.write("/etc/test-xdg/cockpit/cockpit.conf", "LoginTitle=NoNoNO\n")
        m.execute("systemctl stop cockpit")
        self.login_and_go(None)
        b.wait_visible("#nav-system li:contains(Hackices)")
        self.assertFalse(b.is_present("#nav-system li:contains(Services)"))
        b.logout()
        # message from cockpit-ws
        self.allow_journal_messages(".*cockpit.conf: key=val line not in any section.*")
        # message from bridge
        self.allow_journal_messages(".*cockpit.conf is invalid: File contains no section headers.*")
        self.allow_journal_messages(".*cockpit.conf.*line: 1")
        self.allow_journal_messages(".*LoginTitle=NoNoNO.*")

    @testlib.nondestructive
    @testlib.skipBeiboot("no cockpit-bridge binary on host in beiboot mode")
    def testBridgeCLI(self):
        m = self.machine

        out = m.execute("cockpit-bridge --help")
        # this needs to work with both the C and Python bridges
        self.assertRegex(out, r"[uU]sage:")
        self.assertIn("--packages", out)

        version = m.execute("cockpit-bridge --version")
        self.assertIn("Protocol: 1", version)
        self.assertRegex(version, r"Version: \d{3,}")

        packages = m.execute("cockpit-bridge --packages")
        self.assertRegex(packages, r"(^|\n)base1\s+.*/usr/share/cockpit/base1")
        # also includes menu and tools entries
        self.assertRegex(packages, r"(^|\n)system\s.*Services.*Terminal.*\s/usr/share/cockpit/systemd")

        bridges = json.loads(m.execute("cockpit-bridge --bridges").strip())
        bridge_names = [b['name'] for b in bridges]
        self.assertGreater(len(bridges), 0)
        self.assertIn('sudo', bridge_names)


class TestReverseProxy(testlib.MachineCase):

    provision = {
        "0": {"forward": {"443": 8443}}
    }

    def setUp(self):
        super().setUp()
        m = self.machine

        m.execute("if firewall-cmd --state >/dev/null 2>&1; then firewall-cmd --add-service https; fi")

        m.upload(["../src/tls/ca/alice.pem", "../src/tls/ca/alice.key"], "/etc/pki")

        m.write("/etc/cockpit/cockpit.conf", """[WebService]
Origins = https://%(origin)s wss://%(origin)s
ForwardedForHeader = X-Forwarded-For
ProtocolHeader = X-Forwarded-Proto
""" % {"origin": m.forward["443"]}, append=True)

        m.execute("setsebool -P httpd_can_network_connect on")
        # these tests are running the cockpit-ws binary directly, not via cockpit.socket
        m.execute("systemctl start cockpit-session.socket")
        self.allow_journal_messages("audit.*bool=httpd_can_network_connect.*val=1.*")

    def callProxyCurl(self, path, *args):
        # should use nginx' certificate, not cockpit's; use --resolve so that SNI matches the certificate's CN
        (https_host, https_port) = self.machine.forward["443"].split(':')
        return subprocess.check_output(
            ["curl", "--verbose",
             "--resolve", f"alice:{https_port}:{https_host}",
             "--cacert", os.path.join(testlib.TEST_DIR, "../src/tls/ca/ca.pem"),
             *args,
             f"https://alice:{https_port}{path}"],
            stderr=subprocess.STDOUT)

    def checkCockpitOnProxy(self, urlroot="", login=True):
        b = self.new_browser()

        out = self.callProxyCurl(f"{urlroot}/cockpit/static/login.html", "--head")
        self.assertIn(b"HTTP/1.1 200 OK", out)
        self.assertIn(b"subject: CN=alice; DC=COCKPIT", out)

        # works with browser (but we can't set our CA)
        (https_host, https_port) = self.machine.forward["443"].split(':')
        b.open(f"https://{https_host}:{https_port}{urlroot}/system")
        if login:
            b.wait_visible("#login")
            b.set_val("#login-user-input", "admin")
            b.set_val("#login-password-input", "foobar")
            b.click('#login-button')
        b.wait_visible('#content')
        # Verify that the urlRoot is applied to links in the navbar.
        b.wait_visible(f'#host-apps a[href="{urlroot}/system"]')

        if login:
            # check that we show up in the system logs from the browser's IP, not the proxy's
            self.assertIn('(172.27.0.2)', self.machine.execute('who'))
        b.logout()

    @testlib.skipImage("nginx not installed", "centos-*", "rhel-*", "debian-*", "ubuntu-*", "arch")
    @testlib.skipOstree("nginx not installed")
    def testNginxTLS(self):
        """test proxying to Cockpit with TLS

        As described on https://github.com/cockpit-project/cockpit/wiki/Proxying-Cockpit-over-NGINX
        This use use case is important for proxying a remote machine.
        """
        m = self.machine

        m.write("/etc/nginx/conf.d/cockpit.conf", """
server {
    listen 443 ssl;
    server_name %(origin)s;
    root /srv/www;

    ssl_certificate "/etc/pki/alice.pem";
    ssl_certificate_key "/etc/pki/alice.key";

    location / {
        # Required to proxy the connection to Cockpit
        proxy_pass https://127.0.0.1:9090;
        proxy_set_header Host $host;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;

        # Required for web sockets to function
        proxy_http_version 1.1;
        proxy_buffering off;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";

        # Pass ETag header from Cockpit to clients.
        # See: https://github.com/cockpit-project/cockpit/issues/5239
        gzip off;
    }
}
""" % {"origin": m.forward["443"]})

        m.execute("systemctl start nginx")
        m.start_cockpit(tls=True)
        self.checkCockpitOnProxy()

        # avoid crash when stopping the unit: https://bugzilla.redhat.com/show_bug.cgi?id=2304015
        m.execute("nginx -s stop")

        # now test with UrlRoot
        m.write("/etc/cockpit/cockpit.conf", "UrlRoot = cockpit-root\n", append=True)
        m.execute("systemctl stop cockpit.service")
        self.sed_file("s_location /_location /cockpit-root_", "/etc/nginx/conf.d/cockpit.conf")
        m.execute("systemctl start nginx")
        self.checkCockpitOnProxy(urlroot="/cockpit-root")

        # get a non-cockpit file from the server
        m.execute("mkdir -p /srv/www/embed-cockpit")
        m.upload(["verify/files/embed-cockpit/index.html",
                  "verify/files/embed-cockpit/embed.js",
                  "verify/files/embed-cockpit/embed.css"],
                 "/srv/www/embed-cockpit/")
        m.execute("if selinuxenabled 2>&1; then chcon -R -t httpd_sys_content_t /srv/www; fi")

        out = self.callProxyCurl("/embed-cockpit/embed.css")
        self.assertIn(b"HTTP/1.1 200 OK", out)
        self.assertIn(b"#embed-links", out)

        # embedding
        b = self.browser
        (https_host, https_port) = self.machine.forward["443"].split(':')
        b.open(f"https://{https_host}:{https_port}/embed-cockpit/index.html")
        b.set_val("#embed-address", f"https://{https_host}:{https_port}/cockpit-root")
        b.click("#embed-full")
        b.wait_visible("iframe[name='embed-full'][loaded]")
        b.switch_to_frame("embed-full")

        b.wait_visible("#login")
        b.set_val("#login-user-input", "admin")
        b.set_val("#login-password-input", "foobar")
        b.click('#login-button')
        b.wait_visible('.pf-v5-c-card.system-health')

    @testlib.skipImage("nginx not installed", "centos-*", "rhel-*", "debian-*", "ubuntu-*", "arch")
    @testlib.skipOstree("nginx not installed")
    def testNginxNoTLS(self):
        """test proxying to Cockpit with plain HTTP

        This can be done when nginx and cockpit run on the same machine.
        """
        m = self.machine

        m.write("/etc/nginx/conf.d/cockpit.conf", """
server {
    listen 443 ssl;
    server_name %(origin)s;

    ssl_certificate "/etc/pki/alice.pem";
    ssl_certificate_key "/etc/pki/alice.key";

    location / {
        # Required to proxy the connection to Cockpit
        proxy_pass http://127.0.0.1:9090;
        proxy_set_header Host $host;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;

        # Required for web sockets to function
        proxy_http_version 1.1;
        proxy_buffering off;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";

        # Pass ETag header from Cockpit to clients.
        # See: https://github.com/cockpit-project/cockpit/issues/5239
        gzip off;
    }
}
""" % {"origin": m.forward["443"]})

        m.execute("systemctl start nginx")

        def run_ws(extra_opts=""):
            m.spawn(
                f"runuser -u cockpit-session-socket -- {self.libexecdir}/cockpit-ws "
                f"--address=127.0.0.1 --for-tls-proxy {extra_opts}", "ws.log")
            m.wait_for_cockpit_running()

        def kill_ws():
            m.execute("pkill cockpit-ws; while pgrep -a cockpit-ws; do sleep 1; done")

        # start cockpit-ws in proxy mode, skip all the ws-certs.d/ steps
        run_ws()
        self.checkCockpitOnProxy()
        kill_ws()

        # works also without the login page (krb, oauth, --local-session, etc.)
        run_ws("--local-session=cockpit-bridge")
        self.checkCockpitOnProxy(login=False)
        kill_ws()

        # avoid crash when stopping the unit: https://bugzilla.redhat.com/show_bug.cgi?id=2304015
        m.execute("nginx -s stop")

        # UrlRoot + login page
        m.write("/etc/cockpit/cockpit.conf", "UrlRoot = myroot\n", append=True)
        self.sed_file("s_location /_location /myroot_", "/etc/nginx/conf.d/cockpit.conf")
        m.execute("systemctl start nginx")
        run_ws()
        self.checkCockpitOnProxy(urlroot="/myroot")
        kill_ws()

        # UrlRoot without login page
        run_ws("--local-session=cockpit-bridge")
        self.checkCockpitOnProxy(urlroot="/myroot", login=False)
        kill_ws()

        self.allow_journal_messages("couldn't register polkit authentication agent.*")
        self.allow_journal_messages("couldn't change to runtime dir.*Permission denied")


if __name__ == '__main__':
    testlib.test_main()
